Snowflake×dbtを試してみた~Part3:パイプライン構築編その1~ #SnowflakeDB #dbt
※本エントリは、Snowflakeをより使いこなそう! Advent Calendar 2021の21日目の記事となります。
さがらです。
Snowflake公式のdbtと連携した時の機能を一通り試すことが出来るQUICKSTARTSに関して試してみた内容をまとめていきます。※この記事は「Part3:パイプライン構築編その1」となります。
この記事の内容について
Snowflake公式のQUICKSTARTSに、Accelerating Data Teams with dbt & SnowflakeというSnowflakeとdbtを組み合わせたときの利点を一通り試すことが出来るクイックスタートがあります。
こちらの内容について、以下の合計5本の構成で試してみた内容を書いていきます。
- Part1:基本設定編
- 「1.Overview」から「5. dbt Configuration」
- Part2:Project設定編
- 「6. dbt Project Configuration」
- Part3:パイプライン構築編その1 ※この記事です。
- 「7. Building dbt Data Pipelines」から「9. dbt pipelines - Intermediate」
- Part4:パイプライン構築編その2 ※12/22に公開予定、公開後にリンクも正常に動作します。
- 「10. dbt pipelines - Seeds」から「12. dbt pipelines - Facts」
- Part5:テスト&Doc&デプロイ編 ※12/23に公開予定、公開後にリンクも正常に動作します。
- 「13. dbt pipelines - Tests & Docs」から「16. Appendix」
この記事では、「パイプライン構築編その1」ということで「7. Building dbt Data Pipelines」から「9. dbt pipelines - Intermediate」の内容についてまとめていきます。
7. Building dbt Data Pipelines
ここでは、dbtを用いてどのようなパイプラインを構築していくかについて説明されています。公式の英文を引用すると、以下の4つに関するTransformを行うパイプラインを構築すると書いてあります。
- Stock trading history(株式取引履歴)
- Currency exchange rates(為替レート)
- Trading books(取引帳簿)
- Profit & Loss calculation(損益計算)
8. dbt pipelines - Sources & Staging
ここでは、使用するテーブルの定義と、今後のTransformのベースとなるステージングモデルを定義していきます。
使用するテーブルの定義
dbt sourcesを定義していきます。
sourcesを定義することで、記述するselect文のfrom句で{{ source(~~) }}
と記述することで参照するテーブルを指定できるようになります。
まず、定義用のファイルを作ります。
先に作成していたmodels/staging/knoema
フォルダの配下に、knoema_sources.yml
というファイルを作成します。使用するデータとしては、日次の為替レートと日次の米国取引履歴に関するテーブルです。
ファイルが出来たら、以下の内容をコピーして貼り付けし、右上のsave
を押します。
※これは補足説明ですが先頭でversion: 2
と記述しているのは、yamlファイルの構造に大きな変化がありその変化点がversion2のため、version:2の構造を使用することを明示的にしていることが理由です。詳細はこちらのDocをご確認ください。
version: 2 sources: - name: knoema_economy_data_atlas database: knoema_economy_data_atlas schema: economy tables: - name: exratescc2018 - name: usindssp2020
ここで一度コミットしておきます。コミットメッセージはset up knoema source file
と入れておきます。※内容はsourceファイルを定義した、ということがわかれば何でもOKです。
ステージングモデルの定義(1つ目)
続いて、ステージングモデルの定義をしていきます。
ステージングモデルは、それが表すソースのテーブルと一対一の関係を持ちます。テーブルの粒度は同じですが、カラムは何らかの方法で名前を変えたり、キャストし直したり、一貫性のある有用なフォーマットに従うような処理を施しています。これらを最初に作成しておくことで、より複雑な変換を構築するための一貫した基盤を構築することができます。
ステージングモデルに関するdbtの見解については、こちらのページも併せてご確認ください。
では、ステージングモデルを作っていきます。まずはexratescc2018
テーブルに関して作成します。
models/staging/knoema
フォルダの配下に、stg_knoema_fx_rates.sql
というファイルを作ります。
ファイルが出来たら、下記の内容をコピーして貼り付けして右上のsave
を押します。
with source as ( select * from {{source('knoema_economy_data_atlas','exratescc2018') }} ), renamed as ( select "Currency" as currency, "Currency Unit" as currency_unit, "Frequency" as frequency, "Date" as exchange_date, "Value" as exchange_value, "Indicator" as indicator, "Indicator Name" as indicator_name, 'Knoema.FX Rates' as data_source_name from source ) select * from renamed
この上で、compile
ボタンを押してください。すると、stg_knoema_fx_rates.sql
で記述した{{source('knoema_economy_data_atlas','exratescc2018') }}
がknoema_economy_data_atlas.economy.exratescc2018
に変化していることがわかると思います。
これが、事前に作成したknoema_sources.yml
ファイルの効力です。
ここで、sourceを事前に定義することによるメリットについて2つ触れておきます。
まず1つ目、Lineage
を押すと、対象のmodelがsourceとどう関連しているかをDAGの形式で表示してくれます。
続いて2つ目、見て頂いたとおりコードを変更することなく動的に参照先を変更できるため、開発環境と本番環境でデータベースやスキーマを分けていた場合でも同じコードを使用することが出来ますし、target jinja関数などを使用すれば、条件に沿って使用するデータベースを定義することも可能です。
ステージングモデルの定義(2つ目)
続いて、usindssp2020
テーブルに関するステージングモデルを作成していきます。
models/staging/knoema
フォルダの配下に、stg_knoema_stock_history.sql
というファイルを作ります。
ファイルが出来たら、下記の内容をコピーして貼り付けして右上のsave
を押します。
with source as ( select * from {{source('knoema_economy_data_atlas','usindssp2020')}} ), renamed as ( select "Company" as company, "Company Name" as company_name, "Company Symbol" as company_symbol, "Stock Exchange" as stock_exchange, "Stock Exchange Name" as stock_exchange_name, "Indicator" as indicator, "Indicator Name" as indicator_name, "Units" as units, "Scale" as scale, "Frequency" as frequency, "Date" as stock_date, "Value" as stock_value, 'Knoema.Stock History' as data_source_name from source ) select * from renamed
定義したステージングモデルの実行
ここまでに2つのステージングモデルの定義を行ったので、早速実行してビューを作ってみましょう。
コマンドラインでdbt run -m staging.*
と入力し、実行してください。-m
の後に実行したいmodelがあるフォルダを指定して.*
を付けると、その配下にあるmodelだけを実行してくれます。
実行後、下図のようにログが表示されればOKです。
正しく実行できたならば、その状態で一度Commitしておきましょう。コミットメッセージは任意ですが、set up staging models
のように「ステージングモデルを定義したよ」ということがわかるように指定すると良いと思います。
続いて、モデルを実行して作られたビューに対してクエリを実行してみます。
下図の手順に沿って、新しいStatementを追加し、下記SQLを貼り付けてスキーマ名を自身のものに変更し、Preview
を押してみてください。
select * from pc_dbt_db.<dev_schema>_staging.stg_knoema_stock_history where company_symbol ='AAPL' and stock_date ='2021-03-01'
現在のデータを見ると、終値(Close)、始値(Open)、高値(High)、安値(Low)が別の行として定義されています。
このため、下図のように列で分けて見ることができるように、次章以降で変換処理を構築していきます。
9. dbt pipelines - Intermediate
ここでは、目的のテーブルを出力するための中間テーブル/ビューを構築するmodelの定義と、自動生成されるDocについても少し確認していきます。
ピボットを用いた中間modelの定義
まず、新しいファイルを作ります。フォルダも併せて作るため、任意のフォルダからNew File
を押して、入力欄を全てmodels/marts/core/intermediate/int_knoema_stock_history.sql
に書き換えた上で、ファイルを作成してください。
作成したファイルに対して、以下のクエリをコピーして貼り付けし、右上のsave
を押してください。
ここでは、dbt_utils
のpivot
マクロを使って、データセットを行から列に転置する処理を書いています。また、列の値を動的にリストアップするために、get_columns
という別のdbt_utils
のマクロをネストしています。
with stock_history as ( select * from {{ ref('stg_knoema_stock_history') }} where indicator_name in ('Close', 'Open','High','Low', 'Volume', 'Change %') ), pivoted as ( select company_symbol, company_name, stock_exchange_name, stock_date, data_source_name, {{ dbt_utils.pivot( column = 'indicator_name', values = dbt_utils.get_column_values(ref('stg_knoema_stock_history'), 'indicator_name'), then_value = 'stock_value' ) }} from stock_history group by company_symbol, company_name, stock_exchange_name, stock_date, data_source_name ) select * from pivoted
この上でCompile
ボタンを押すと、このマクロがどういったSQLに変換されているかがわかります。具体的には、case
文を駆使して該当する場合はその行のstock_value
の値を持ってきて、行数を圧縮するためにSUMで集計しているようですね。
続いて、dbt_utils
マクロを使わずにコードを書くとどうなるかを見てみましょう。同じフォルダに新規ファイルを作成します。任意のフォルダからNew File
を押して、入力欄を全てmodels/marts/core/intermediate/int_knoema_stock_history_alt.sql
に書き換えた上で、ファイルを作成してください。
こちらについても作成したファイルに対して、以下のクエリをコピーして貼り付けし、右上のsave
を押してください。
こちらは、Snowflakeのpivot
関数を使用した場合のクエリになります。
with stock_history as ( select * from {{ ref('stg_knoema_stock_history') }} where indicator_name IN ('Close', 'Open','High','Low', 'Volume', 'Change %') ), pivoted as ( select * from stock_history pivot(sum(stock_value) for indicator_name in ('Close', 'Open','High','Low', 'Volume', 'Change %') ) as p( company_symbol, company_name, stock_exchange_name, stock_date, data_source_name, close, open, high, low, volume,change ) ) select * from pivoted
ここで、「dbt_utils
のpivot
マクロ」と「Snowflakeのpivot
関数」2つのパターンについてクエリを見てきました。いろいろな見方があると思いますが、以下の理由から「dbt_utils
のpivot
マクロ」の方が汎用性が高いと思います。
pivot
マクロとget_columns
マクロを組み合わせる場合、引数に対象列、model名、参照する値を持つ列、を入れるだけでよい- Snowflakeの関数の場合、値を1つ1つ指定しないといけないが、dbtの
pivot
マクロの場合は動的に値を取得してピボットしてくれる
ref()について
先程のピボットに関するクエリの中で、select * from {{ ref('stg_knoema_stock_history') }}
のように、ref()
という記述があったと思います。このref()
を使用することにより、動的に参照するテーブルを変えることが出来ます。
dbtでmodelを記述する際は、基本的に参照先のテーブル名を直接指定することはバッドプラクティスです。ref()やsource()を使用しましょう!
ピボットを用いた中間modelの実行
先程作成したmodelを実行してみます。
コマンドラインで、dbt run -m +int_knoema_stock_history
と入力し実行しましょう。+
を入れておくことで、対象のmodelの親modelも併せて実行されます。
実行後、下図のように親modelも実行されていることがわかると思います。
ここで、modelを介して作られたテーブルに対してクエリを発行してみましょう。dbt Cloud上、あるいはSnowflake上で、下記のクエリをコピーして、スキーマ名を自身のものに変更した上で実行してみてください。
SELECT * FROM pc_dbt_db.<dev_schema>_marts.int_knoema_stock_history WHERE company_symbol = 'AAPL' AND stock_date = '2021-03-01'
狙い通り、1行でClose、Low、Open、Highが並んでいればOKです!
為替レートに関する中間modelの定義
続いて、為替レートに関する中間modelを定義していきます。
まず、新しいファイルを作成します。New File
を押して、models/marts/core/intermediate/int_fx_rates.sql
に書き換えた上で新しいファイルを作成します。
ファイルが出来たら、以下のクエリをコピーして貼り付けて、右上のsave
を押しましょう。
{{ config( materialized='view', tags=["hourly"] ) }} select * from {{ ref('stg_knoema_fx_rates') }} where indicator_name = 'Close' and frequency = 'D' and exchange_date > '2016-01-01'
この貼り付けたコードで見るべきはconfig
とその中身です。
まずconfig
について、このmodelに対するオプションを設定することが出来ます。
materialized
:対象のmodel実行後、どのオブジェクトで生成するかを定義する。dbt_project.yml
でも定義がされている場合には、このmodel内でのconfig
の内容が優先されます。tags
:対象のmodelに対してタグを付与できます。タグは、プロジェクトのうちの一部を実行するために用いたり、modelをグループ化する際に便利です。
config
についての設定は、こちらのDocも併せてご覧ください。
為替レートに関する中間modelの実行
ということで、早速定義した中間modelを実行してみます。
コマンドラインで、dbt run -m tag:hourly
と入力して実行してください。-m tag:
で、指定したtagに関係するmodelだけを実行可能です。
実行すると、先程タグで指定したmodelだけ実行されているのがわかると思います。
為替とトレード履歴を併せるmodelの定義
続いて、これまでに作成した2つのmodelを用いたmodelを作っていきます。
まず、ファイルを作成します。New File
を押して、models/marts/core/intermediate/int_stock_history_major_currency.sql
に書き換えた上で新しいファイルを作成します。
続いて、下記のクエリをコピーして貼り付けし、右上のsave
を押します。
with stock_history as ( select * from {{ ref('int_knoema_stock_history')}} ), fx_rates as ( select * from {{ ref('int_fx_rates') }} ), fx_rates_gdp as ( select * from fx_rates where currency = 'USD/GBP' ), fx_rates_eur as ( select * from fx_rates where currency = 'USD/EUR' ), joined as ( select stock_history.*, fx_rates_gdp.exchange_value * stock_history."Open" as gbp_open, fx_rates_gdp.exchange_value * stock_history."High" as gbp_high, fx_rates_gdp.exchange_value * stock_history."Low" as gbp_low, fx_rates_gdp.exchange_value * stock_history."Close" as gbp_close, fx_rates_eur.exchange_value * stock_history."Open" as eur_open, fx_rates_eur.exchange_value * stock_history."High" as eur_high, fx_rates_eur.exchange_value *stock_history."Low" as eur_low, fx_rates_eur.exchange_value * stock_history."Close" as eur_close from stock_history left join fx_rates_gdp on stock_history.stock_date = fx_rates_gdp.exchange_date left join fx_rates_eur on stock_history.stock_date = fx_rates_eur.exchange_date ) select * from joined
続いて、このmodelについて親modelと併せて実行してみます。
コマンドラインにdbt run --model +int_stock_history_major_currency
と入力し、実行してください。
親モデル含めて、5つのモデルが実行されていればOKです!
ドキュメントの生成
dbtは、Snowflakeのinformation_schema
と同様にdbtプロジェクトの情報を取り込んで、各種modelやデータの情報を持つ静的なWebページを生成することができます。このプロジェクトに関するドキュメント、と言ってもよいでしょう。
このドキュメントは、各カラム、タグ、テストだけでなく、ソースコードに関するすべての重要な情報が含まれているため、社内のチームと情報を共有するためにはとても優れています。インタラクティブなDAGを提供するので、modelのリネージ全体像を見ることができます。
更に、コマンド一つで生成可能なため、運用負荷もかかりません。
では、早速ドキュメントを生成していきましょう。
コマンドラインでdbt docs generate
と入力し、実行してください。
完了したら、画面左上のview docs
が押せるようになったはずです。こちらを押してみてください。
すると、対象のプロジェクトに関するドキュメントが別タブで立ち上がると思います。
下図のようにmodelごとのカラム情報や所有者、テーブルのサイズを確認したり、右下のアイコンを押すことで、リネージを確認することも可能です。
おまけ:9章までの内容のコミット
これは公式ページ上には載ってないのですが、中間モデルを生成したということで一度Commitしておきましょう。
次回
Snowflakeをより使いこなそう! Advent Calendar 2021、次回の22日目では、「Snowflake×dbtを試してみた~Part4:パイプライン構築編その2~」というタイトルで執筆します。お楽しみに!